通过源码对SparkShuffle深入解析
☝点击上方蓝字,关注我们!
概述
Shuffle过程中数据需要根据分区规则重新进行分配,这其中涉及到磁盘IO和网络传输,因此它是分布式计算的天敌,在内存、CPU等硬件资源越来越充足,数据本地计算的效率越来越高的情况下,Shuffle性能往往就决定了分布式任务的整体效率。Spark是一个高性能的分布式计算引擎,它的Shuffle实现方式从早期的Hash Based Shuffle一步一步优化,形成了目前Write端的三种实现方式:SortShuffleWriter、BypassMergeSortShuffleWriter、UnsafeShuffleWriter以及Read端统一实现方式:BlockStoreShuffleReader。Shuffle是一个非常复杂的过程,任何一个环节都值得深入分析,本文将从Spark的Shuffle演变历史开始,再结合生产环境版本的源码对目前Spark shuffle的主要过程进行分析。
Spark Shuffle的演变史
1.Spark 0.8之前 - Hash Based Shuffle
这种方式实现起来简单粗暴,但是在大数据量环境下会存在两个很明显的问题:
1)每个MapTask会产生ReduceTask数个临时文件,每个文件里是同一个Reduce分区的数据,在有X个MapTask,Y个ReduceTask的任务中将产生X*Y个临时文件,这在大数据、多任务的集群环境下会对机器的文件系统造成很大的压力。
2)Map端会开辟X*Y个内存缓冲区来溢写临时文件,在内存资源不充足的环境中很容易导致内存溢出。
2. Spark 0.8.1为Hash Based Shuffle加入了File Consolidation机制
针对Hash Based Shuffle带来的问题,在0.8.1版本进行了优化,为Executor的每个Core开辟ReduceTask个数的内存缓存区,每个Core将Map端的数据写入同一个临时文件,在有Z个Core、X个MapTask、Y个ReduceTask的任务中,这样临时文件个数减少为Z * Y个,每个文件里是同一个Reduce分区的数据。
这种方式能一定程度上缓解内存压力、减少临时文件个数,但当RedueTask过多的时候,Z*Y任然很大。
3. Spark 0.9 引入ExternalAppendOnlyMap
这是一个当内存不足时用来溢写数据到磁盘的数据结构,API注释如下:
* An append-only map that spills sorted content to disk when there is insufficient space for it to grow.
4. Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark开始寻求替代Hash Based Shuffle的方案,借鉴了Mapreduce 的Shuffle实现方式,引入了Sort Based Shuffle,虽然不是默认方式,但是可以通过将参数spark.shuffle.manager设置为 sort来手动选择使用Sort Based Shuffle。
5.Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
将Sort Based Shuffle设置为默认方式,这种方式使用ExternalSorter来处理Map数据,对于每一个MapTask只会产生两个临时文件,一个是数据文件,另一个是记录数据中不同Reduce分区数据位置的索引文件。这种方式大大减少了临时文件个数,同时采用PartitionedAppendOnlyMap和PartitionedPairBuffer的数据结构也可以大大减小溢写磁盘带来内存的开销。
6.Spark 1.4 引入Tungsten-Sort Based Shuffle
Tungsten-Sort是使用新的内存管理模型通过指针数组直接对序列化的二进制数据进行排序,省去了数据处理过程中不必要的序列化与反序列化的开销,这种模式下需要序列化器支持对已经序列化的对象重定位,具体的在下面会提到。
7.Spark 1.6 Tungsten-sort并入Sort Based Shuffle
在这个版本中,将Tungsten-sort加入到了SortBasedShuffle中,并且将Shuffle Writer分为SortShuffleWriter、BypassMergeSortShuffleWriter、Unsafe ShuffleWriter三种,SortShuffleManager会根据ShuffleMap文件分区数、是否有Mapside排序、是否需要对数据排序、是否支持Relocation自动选择Shuffle方式。
8.Spark 2.0 Hash Based Shuffle退出历史舞台
到2.0版本,Hash Based Shuffle已经从API中移出。
Spark目前的Shuffle实现方式解析
接下来将结合Spark生产环境2.1.1版本的源码对Spark的Shuffle实现进行分析。
Spark Shuffle的体系结构
2.0版本后HashShuffleManager已经从shuffle包下面删除,只通过SortShuffle Manager来管理Shuffle,将Writer分为三种,而Reader则统一实现。
Shuffle Writer实现
接下来看一下在Sort Shuffle Manager中get Writer方法怎样选择具体的Writer:
上面的代码原来是根据不同的ShuffleHandle来进行选择的,因此还需要知道不同类型handle的生成条件,要而ShuffleHandle在SortShuffleManager中注册:
在register Shuffle里面会根据不同条件选取ShuffleHandle。
首先是shouldBypassMergeSort方法,它对应BypassMergeSortShuffle Handle:
可以看出它的条件是:没有Mapside Combine&&Reduce端分区个数<= spark.shuffle.sort.by pass MergeThreshold( 默认200 )。回到registerShuffle,接下来是canUseSerializedShuffle,它对应SerializedShuffleHandle:
它的条件是:
RDD内部对象的序列化方式支持Relocation&&该任务没有聚合操作&&Reduce分区数 < 2^24。
再回到register Shuffle,当前面两个方法都返回false时选择BaseShuffleHandle。
根据以上分析,可以得出SortShuffleManager选择Writer的流程:
ByPassMergeSortShuffleWriter
从名字就可以看出来,这种Write的过程不需要进行Merge和Sort,这是由于在Sort Shuffle的早期,Map端都是采用PartitionedAppendOnlyMap或PartitionedPair Buffer的数据结构来对数据进行聚合、排序、溢写,这种方式对于大量ReduceTask环境下map端有聚合或者需要排序的任务来说会非常高效,但是通常对于一个ReduceTask数量不多(小于默认值200)并且不需要排序和map端聚合的任务来说就,反而类似HashBasedShuffle方式更高效。
这种方式跟最早的HashBasedShuffle相似,每个DiskBlockObjectWriter分配一个输出缓冲区,对Map里的数据根据分区规则通过不同的Writer直接追加写入相应的临时文件,每个文件里是同一个Reduce分区的数据,与HashBasedShuffle不同是在最后会把每个Map输出的文件进行合并,最终一个MapTask只生成一个数据文件和一个索引文件,数据文件中按照Reduce分区ID进行排序。
Write方法
ByPassMergeSortShuffleWriter继承了ShuffleWriter,核心是write算法,用来对mapTask的输入进行Write。
WritePartitionedFile方法
看下writePartitionedFile的实现逻辑,它会将之前的多个临时文件进行合并,由于每个临时文件里面的数据都属于同一个Reduce分区,直接将这些文件按顺序copy到目标临时文件,最后会返回每个文件的大小,用来记录index:
WriteIndexFileAndCommit方法
接下来是生成indexfile 的 writeIndexFileAndCommit,将writePartitionedFile返回的每个分区文件大小的数组作为索引数据,写入索引文件,核查数据是否完整,然后更名为正式文件:
到这里,BypassMergeSortShuffleWriter的write过程完毕,为每个reduce生成了一个dataFile和IndexFile,其中dataFile中仅按照分区ID进行了排序。
UnsafeShuffleWriter
在说Tungsten-Sort之前先要提一下 TungstenProject,TungstenProject 是 Databricks 公司提出的对Spark优化内存和CPU使用的计划,其目的在于榨干硬件,让spark充分利用申请到的资源。该计划设计了一套新的统一内存管理机制,Executionmemory 和storagememory之间可以共享空间,可以申请堆外内存,堆外内存通过 sun.misc.Unsafe公共API 实现,并且将堆内内存和堆外内存都封装成很多个MemoryBlock,其结构中有一个obj标识和一个64位的地址,堆外内存obj为null,地址直接对应物理地址,堆内内存obj为JVM对象的基地址,前13位表示page number,后51位表示在page内的偏移,
Unsafe Shuffle Writer开启条件是对象序列化方式支持Relocation(即序列化的数据不需要反序列化,对其元数据进行排序后,在指定位置能读取该数据,目前只有Kryo序列化支持)、 任务中没有聚合、Reduce分区小于2^24个(24位存储分区ID)。
该Writer通过ShuffleExternalSorter将序列化数据存到MemoryBlock里面,同时将记录的Reduce分区ID、数据地址存储到ShuffleInMemorySorter的指针数组中,当ShuffleInMemorySorter存储条数达到阈值或者申请不到内存page时,对Shuffle InMemorySorter里面的数据按照分区ID进行排序然后根据地址按顺序拿到数据进行spill,最后将多个spill文件按分区ID进行全局排序,并根据排序后的指针地址顺序获取数据进行spill。
Write方法
Write方法,调用insertRecordIntoSorter循环逐条处理数据:
insertRecordIntoSorter方法
insertRecordIntoSorter方法中,调用insertRecord方法,通过Serialization Stream将数据插入ShuffleExternalSorter:
进一步通过writeSortedFile实现,它通过ShuffleInMemorySorter的getSorted Iterator返回一个排好序的迭代器,然后按顺序从该迭代器每个记录里解析出分区id和数据地址,将分区id和地址里的数据写入临时文件。
insertRecord方法
Spill方法
溢写过程在spill中:
writeSortedFile方法
进一步通过writeSortedFile实现,它通过ShuffleInMemorySorter的getSortedIterator返回一个排好序的迭代器,然后按顺序从该迭代器每个记录里解析出分区id和数据地址,将分区id和地址里的数据写入临时文件。
CloseAndWriteOutput方法
Write方法完了以后会调用closeAndWriteOutput来合并文件和生成索引文件:
SortShuffleWriter
SortShuffleWriter 使用 ExternalSorter 来处理mapTask的数据以及merge分区文件, 官方对该类的描述如下:
这个类用于对一些(K, V)类型的key-value对进行排序或者merge,生的结果是一些(K, C)类型的key-combiner对。combiner就是对同样key的value进行合并的结果。
它首先使用一个Partitioner来把key分到不同的partition,同时可以进行map端聚合,按照partitionID排序,在需要溢写磁盘时候,如果用户有定义排序的话,就把每个partition内部的key按照一给定的Comparator来进行排序。External Sorter 中有两种数据结构PartitionedAppendOnlyMap、PartitionedPairBuffer,会根据是否有mapsideCombine来决定使用哪种数据结构处理数据。当有mapside Combine时采用 PartitionedAppendOnlyMap,否则使用PartitionedPair Buffer。
数据结构
也就是说,整个类可以根据构造方法传入的聚合函数和比较排序函数对数据进行聚合和排序,当有mapside端聚合时sorter内部使用PartitionedAppendOnlyMap,否则使用PartitionedPairBuffer,先介绍下这两个类:
PartitionedPairBuffer 和 PartitionedAppendOnlyMap首先会单纯的更新或者插入mapTask过来的数据,在spill的时候会调用destructiveSortedWritable PartitionedIterator方法会集合中的数据进行排序,如果算子中有排序则会按照(partitionID,key),否则只按照partitionID排序,然后返回一个将数据写到Disk BlockObjectWriter的迭代器。
Write方法
SortShuffleWriter 继承了ShuffleWriter,核心是 write 方法,用来获取mapTask的输出来进行ShuffleWrite:
InsertAll方法
首先看生成分区临时文件的 spill的过程,在sorter.insert All(records)里面:
maybeSpillCollection方法
maybeSpill方法
在maybeSpill中满足条件则溢写磁盘。
spill的条件:写磁盘前会一直尝试申请更多的内存,直到申请不到足够内存。
Spill方法
partitionedDestructiveSortedIterator方法
partitionComparator方法
writePartitionedFile方法
insertAll执行完以后,磁盘溢写完毕。接下来是合并临时文件,回到Sort Shuffle Writer中的writePartitionedFile.
Merge方法
下面是merge实现逻辑:
将返回的迭代器在 writePartitionedFile 中写入磁盘 ,至此生成dataFile完毕。
接下来是index file生成 writeIndexFileAndCommit 跟BypassMergeSort Shuffle Writer调用的一样的类的方法,原理一样。
到这里,SortShuffleWriter 的write过程完毕,为每个reduce生成了一个dataFile和IndexFile,其中dataFile中按照分区ID进行了排序,如果定义了mapside Combine且定义了排序才会对 (partition ID,Key) 二元组排序。
总结
上面介绍了三种ShuffleWriter的主要实现步骤,他们在中间的排序、聚合、spill逻辑各不相同,相同的是最终都是每个MapTask生成一个数据文件和一个索引文件:
1.在数据量小,Reduce分区数<=spark.shuffle.sort.by pass Merge Threshold( 默认200 ) 并且任务中没有Map端聚合操作时,采用类似Hase Based Shuffle的实现方式,不会对数据进行sort和merge,而是直接将每个Map Task的数据按Key值得到Reduce分区ID,然后每个分区生成一个DiskObjectWriter将该条数据Append到这个MapTask对应分区的临时文件里面,不同的是最后会对MapTask生成的临时文件进行合并,每个MapTask生成一个数据文件和一个索引文件。
2.当对象的序列化方式支持Recolation并且Reduce分区数小于2^24时,采用UnsafeSortshuffle,这种方式省去了将接收到的数据进行排序时要经历的反序列化然后再序列化的过程,直接将二进制数据写入MemoryBlock,同时将该数据的Reduce分区Id以及数据地址写入inMemorySorter的指针数组,排序时只对指针数组排序,spill时根据地址指针去获取数据。
3.最后一种SortShufleWriter,当Shuffle不满足上述两种条件时采用。内部采用ExternalSorter来溢写数据。其中,当没有map端聚合时,ExternalSorter内部采用PartitionedPairBuffer来接纳数据,这个时候即使job输出指定了排序逻辑,在shuffle write时并不会预排序,PartitionedPairBuffer的结构特点使得数据插入后是分区Id有序的,达到溢写磁盘条件时直接将Buffer内数据写出;当map端有聚合时,会采用PartitionedAppendOnlyMap来接纳数据,可以对key值进行跟新来完成聚合操作,并且这个时候job输出指定了排序逻辑的话,spill的时候数据会按照(Id,Key)的二元组进行排序。
Shuffle Reader实现
ShuffleRead用来承接ShuffleWrite,将MapTask端的数据拉取过来进行进一步的处理。SortBbasedShuffle的ShuffleRead实现方式只有一种:BlockStoreShuffle Reader。
Block Store Shuffle Reader
它的实现结构很简单,就一个read方法,在其中通过一个Shuffle Block Fetcher Iterator返回需要拉取的数据的迭代器。
ShuffleBlockFetcherIterator是一个拉取多个block数据的迭代器,Block可以是本地的也可以是远端的,它会返回一个(Block ID,Input Stream)二元组的迭代器。如果算子中有聚合则会对拉来的数据进行聚合,如果算子中有排序则会对拉来的数据进行排序,使用SortShuffleWriter中提到过的ExternalSorter。
Read方法
获取本文源码,请后台联系小助手